Java 集合类学习 Queue 队列
Queue 接口
Queue 表示队列,在Java中,一般是FIFO(先进先出)的队列,但也有不是先进先出的队列,比如优先级队列(堆)
介绍下 Queue 接口的基本方法:
public interface Queue<E> extends Collection<E> {
// 添加一个元素到队列,在先进先出队列中,是添加到队列尾部
// 如果由于容量不足插入失败,则抛出异常,不会返回false
boolean add(E e);
// 和add方法一样,不过不同之处在于,这个方法添加失败一般是返回false,而不是抛出异常
boolean offer(E e);
// 移除队列尾部的元素(最后进来的元素),如果队列为空,则抛出异常,不返回false
E remove();
// 和remove方法一样,不过这个方法在队列为空时会返回false,不抛出异常
E poll();
// 查看队列头部的元素(最先进来的元素),如果队列为空,则抛出异常,不返回null
E element();
// 和element方法一样,不过这个在队列为空时,返回null,不抛出异常
E peek();
}
所有队列实现类一览
- ArrayDeque, (数组双端队列)
- PriorityQueue, (优先级队列)
- ConcurrentLinkedQueue, (基于链表的并发队列)
- DelayQueue, (延期阻塞队列)(阻塞队列实现了BlockingQueue接口)
- ArrayBlockingQueue, (基于数组的并发阻塞队列)
- LinkedBlockingQueue, (基于链表的FIFO阻塞队列)
- LinkedBlockingDeque, (基于链表的FIFO双端阻塞队列)
- PriorityBlockingQueue, (带优先级的无界阻塞队列)
- SynchronousQueue (并发同步阻塞队列)
双端队列看 Deque 接口那篇笔记
add 和 offer 插入
add(E e)
和 offer(E e)
的语义相同,都是向优先队列中插入元素,只是 Queue 接口规定二者对插入失败时的处理不同,前者在插入失败时抛出 异常,后则则会返回 false。
抛出的异常如下:
IllegalStateException – 如果由于容量限制此时无法添加元素
ClassCastException – 如果指定元素的类阻止它被添加到这个队列
NullPointerException – 如果指定元素为 null 且此队列不允许 null 元素
IllegalArgumentException – 如果此元素的某些属性阻止将其添加到此队列
UnsupportedOperationException – 如果此集合不支持添加操作
element 和 peek 查看
element()
和 peek()
的语义完全相同,都是获取但不删除队首元素,也就是队列中权值最小的那个元素,二者唯一的区别是当方法失败时前者抛出异常(如果为空会抛出 NoSuchElementException 异常),后者返回 null
。
remove 和 poll 出队
remove()
和 poll()
方法的语义也完全相同,都是获取并删除队首元素,区别是当方法失败时前者抛出异常(如果为空会抛出 NoSuchElementException 异常),后者返回 null。
非线程安全的实现类
只有这个 PriorityQueue 队列是非线程安全的实现类,所以单独拿出来讲(不过人家也不完全算队列,而是 “堆” 属于优先级队列)
PriorityQueue
PriorityQueue 比较特殊,它是优先级队列,不是 FIFO 的,而是根据元素的优先级,优先级高的或者低的先出队列,因为本质是一颗特殊的完全二叉树,所以它不允许存放 null
元素
具体细节看 PriorityQueue 的那篇笔记
非线程安全的 Queue 实现类就只有这一个(不包括 Deque 的实现类,Deque 拓展了 Queue 接口),其他的大部分都是一些内部类
下面就开始讲本篇的重点:阻塞队列和非阻塞队列
阻塞队列和非阻塞队列
在并发编程中,有时候需要使用线程安全的队列。如果要实现一个线程安全的队列有两种方式:一种是使用阻塞算法,另一种是使用非阻塞算法。
在并发队列上 JDK 提供了两套实现,一个是以 ConcurrentLinkedQueue 为代表的高性能队列非阻塞,一个是以 BlockingQueue 接口为代表的阻塞队列,无论哪种都继承自 Queue。
使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现。非阻塞的实现方式则可以使用循环 CAS 的方式来实现。
阻塞队列
阻塞队列(即 BlockingQueue 家族)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。
阻塞队列(Blocking queue)提供了可阻塞的 put 和 take 方法,它们与可定时的 offer 和 poll 是等价的。如果 Queue 已经满了,put 方法会被阻塞直到有空间可用;如果 Queue 是空的,那么 take 方法会被阻塞,直到有元素可用。Queue 的长度可以有限,也可以无限;无限的 Queue 永远不会充满,所以它的 put 方法永远不会阻塞。
阻塞式队列工作流程:
入列(存):阻塞式队列,如果存放的队列超出队列的总数,是时候会进行等待(阻塞)。当队列达到总数的时候,入列(生产者)会进行阻塞。这时候只有当消费者消费了队列中的队列之后,生产者才可以继续往队列中存放队列。
出列(取):如果获取队列为空的情况下,这时候也会进行等待(阻塞)。这时候队列中没有队列,消费者无法消费队列,只有生产者往对队列中存放队列之后,消费者才可以进行消费。
队列中的队列如果被消费了就会从队列中删除掉。
阻塞队列的应用场景
阻塞队列常用于生产者和消费者的场景(生产者-消费者设计模式),生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。
补充知识:生产者-消费者设计模式:该模式不会添加一个任务便立即处理,而是把任务放在于一个任务清单(TODO)中,以备后期处理。生产者-消费者模式简化了开发,因为它解除了生产者和消费者之间相互依赖的代码。生产者和消费者以不同的或者变化的速度生产和消费数据,生产者-消费者模式将这些活动解耦,因而简化了工作负荷的管理。
最常见的生产者-消费者设计是将线程池与工作队列相结合。
阻塞队列的优缺点
优点:
阻塞队列简化了消费者的编码,因为 take 会保持阻塞直到可用数据出现。但如果生产者不能足够快地产生工作,让消费者忙碌起来,那么消费者只能一直等待,直到有工作可做。同时,put 方法的阻塞特性也大大地简化了生产者的编码;如果使用一个有界队列,那么当队列充满的时候,生产者就会阻塞,暂不能生成更多的工作,从而给消费者时间来赶进进度。
有界队列(有 Max Size)是强大的资源管理工具,用来建立可靠的应用程序:它们遏制那些可以产生过多工作量、具有威胁的活动,从而让你的程序在面对超负荷工作时更加健壮。
缺点:
虽然生产者-消费者模式可以把生产者和消费者的代码相互解耦合,但是它们的行为还是间接地通过共享队列耦合在一起了
基于阻塞的算法也会带来一些活跃度失败的风险。如果线程在持有锁的时候因为阻塞 I/O,页面错误,或其他原因发生延迟,很可能所有的线程都不能前进了。
BlockingQueue 各个实现类
类库中包含一些 BlockingQueue 的实现,其中 LinkedBlockingQueue 和 ArrayBlockingQueue 是 FIFO 队列,与 LinkedList 和 ArrayList 相似,但是却拥有比同步 List 更好的并发性能。
PriorityBlockingQueue 是一个按优先级顺序排序的队列(堆)
最后一个 BlockingQueue 的实现是 SynchronousQueue,它根本上不是一个真正的队列,因为它不会为队列元素维护任何存储空间。不过,它维护一个排队的线程清单,这些线程等待把元素加入(enqueue)队列或者移出(dequeue)队列。因为 SynchronousQueue 没有存储能力,所以除非另一个线程已经准备好参与移交工作,否则 put 和 take 会一直阻止。SynchronousQueue 这类队列只有在消费者充足的时候比较合适,它们总能为下一个任务作好准备。
非阻塞队列
注意:一个线程的失败或挂起不影响其他线程的失败或挂起,这样的算法称为非阻塞(nonblocking)算法;如果算法的每一个步骤中都有一些线程能够继续执行,那么这样的算法称为锁自由(lock-free)算法。
非阻塞队列的代表 ConcurrentLinkedQueue 是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常 ConcurrentLinkedQueue 性能好于 BlockingQueue
入队和出队操作均利用 CAS(compare and set)更新,来保证元素的一致性,这样允许多个线程并发执行,并且不会因为加锁而阻塞线程,使得并发性能更好。但是 该队列不允许 null 元素。
非阻塞队列使用场景
TODO: 用到再更新...
ArrayBlockingQueue
ArrayBlockingQueue 是一个我们常用的典型的有边界(即有容量限制)的 阻塞队列,其内部的实现是基于数组来实现的,我们在创建时需要指定其长度,它的线程安全性由 ReentrantLock(可重入锁)来实现的。不允许存放 null 元素
public ArrayBlockingQueue(int capacity) {...}
public ArrayBlockingQueue(int capacity, boolean fair) {...}
如上所示,ArrayBlockingQueue 提供的构造函数中,我们需要指定队列的长度,同时我们也可以设置队列是都是公平的,当我们设置了容量后就不能再修改了,符合数组的特性,此队列按照先进先出(FIFO)的原则对元素进行排序。
和 ReentrantLock 一样,如果 ArrayBlockingQueue 被设置为非公平的,那么就存在插队的可能;如果设置为公平的,那么等待了最长时间的线程会被优先处理,其他线程不允许插队,不过这样的公平策略同时会带来一定的性能损耗,因为非公平的吞吐量通常会高于公平的情况。
LinkedBlockingQueue
LinkedBlockingQueue 是一个 阻塞队列,它既可以是有边界的,也可以是无边界的。不允许存放null元素
注意:队列容量 capacity 默认为 Integer.MAX_VALUE
,也就是默认为无边界队列
SynchronousQueue
synchronousQueue 是一个不存储任何元素的阻塞队列,每一个 put 操作必须等待 take 操作,否则不能添加元素。同时它也支持公平锁和非公平锁。
synchronousQueue 的容量并不是 1,而是 0。因为它本身不会持有任何元素,它是直接传递的,synchronousQueue 会把元素从生产者直接传递给消费者,在这个过程中能够是不需要存储的
线程池 CachedThreadPool 就是利用了该队列。Executors.newCachedThreadPool()
,因为这个线程池它的最大线程数是 Integer.MAX_VALUE
,它是更具需求来创建线程,所有的线程都是临时线程,使用完后空闲60秒则被回收,
PriorityBlockingQueue
PriorityBlockingQueue 是一个支持优先级排序的无界阻塞队列,可以通过自定义实现 compareTo()
方法来指定元素的排序规则,或者通过构造器参数 Comparator 来指定排序规则。但是需要注意插入队列的对象必须是可比较大小的,也就是 Comparable 的,否则会抛出 ClassCastException 异常。
它的 take 方法在队列为空的时候会阻塞,但是正因为它是无界队列,而且会自动扩容,所以它的队列永远不会满,所以它的 put 方法永远不会阻塞,添加操作始终都会成功
ConcurrentLinkedQueue
ConcurrentLinkedQueue 是无界非阻塞队列,基于链表实现,使用了无锁算法提高并发性能。不允许存放 null 元素
TODO: 用到再更新...
Reference
JAVA中常见的阻塞队列详解 阻塞队列与非阻塞队列区别应用场景 并发阻塞队列和非阻塞队列详解 java阻塞队列与非阻塞队列 Java Queue接口常用实现类